package com.kool.kmqtt.server.processer;

import com.kool.kmqtt.server.PacketSender;
import com.kool.kmqtt.server.ServerConfig;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.encoder.ConnackPacketEncoder;
import com.kool.kmqtt.server.encoder.PublishPacketEncoder;
import com.kool.kmqtt.server.encoder.PubrelPacketEncoder;
import com.kool.kmqtt.server.exception.AppException;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.exception.ProtocolException;
import com.kool.kmqtt.server.packet.*;
import com.kool.kmqtt.server.parser.PacketParser;
import com.kool.kmqtt.server.repository.RepositoryFactory;
import com.kool.kmqtt.server.session.WillStatus;
import com.kool.kmqtt.service.KauthService;
import com.kool.kmqtt.service.request.UserAuthReq;
import com.kool.kmqtt.service.vo.UserAuthResp;
import com.kool.kmqtt.util.SpringUtil;
import com.kool.kmqtt.util.StringUtil;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;

/**
 * CONNECT报文处理器
 */
@Slf4j
public class ConnectPacketProcessor extends PacketProcessor {
    public ConnectPacketProcessor(ChannelHandlerContext ctx, PacketParser packetParser) {
        super(ctx, packetParser);
    }

    @Override
    protected void validate(Packet packet) {
        ConnectVariableHeader connectVariableHeader = (ConnectVariableHeader) packet.getVariableHeader();

        if (!"MQTT".equals(connectVariableHeader.getProtocolName())) {
            //非MQTT协议
            throw new ProtocolException(ErrorCode.NOT_MQTT_PROTOCOL);
        }
        if (connectVariableHeader.getProtocolLevel() != 4) {
            //如果发现不支持的协议级别，服务端必须给发送一个返回码为0x01（不支持的协议级别）的CONNACK报文响应
            sendConnack(1);
            throw new ProtocolException(ErrorCode.UNSUPPORTED_PROTOCOL_LEVEL);
        }

        if (connectVariableHeader.getReserved() != 0) {
            //CONNECT报文的connect flags保留标志位（第0位）不为0
            throw new ProtocolException(ErrorCode.CONNECT_FLAGS_RESERVED_NOT_ZERO);
        }

        if ((!connectVariableHeader.isUserNameFlag() && connectVariableHeader.isPasswordFlag())) {
            //如果用户名标志被设置为0，密码标志也必须设置为0
            throw new ProtocolException(ErrorCode.USER_PASSWORD_FLAG_ERROR);
        }
        //用户鉴权
        if (ServerConfig.getInstance().getUserAuthSwitch()) {
            ConnectPayload connectPayload = (ConnectPayload) packet.getPayload();
            //用户名
            String userName = connectPayload.getUserName();
            if (StringUtil.isEmpty(userName)) {
                throw new AppException(ErrorCode.USERNAME_NULL);
            }
            //密码
            String password = connectPayload.getPassword();
            if (StringUtil.isEmpty(password)) {
                throw new AppException(ErrorCode.PASSWORD_NULL);
            }

            UserAuthReq request = new UserAuthReq();
            request.setUserName(userName);
            request.setPwd(password);

            KauthService kauthService = SpringUtil.getBean(KauthService.class);
            UserAuthResp resp = kauthService.userAuth(request);
            if (resp.getIsSuccess() != 1) {
                //错误次数
                int errTimes = resp.getErrorTimes();
                String errMsg = resp.getErrorMsg();
                throw new AppException(ErrorCode.USER_PASSWORD_AUTH_ERROR, Integer.toString(errTimes), errMsg);
            }
        }

    }

    @Override
    protected void processPacket(Packet packet) {
        ConnectVariableHeader connectVariableHeader = (ConnectVariableHeader) packet.getVariableHeader();

        //清理会话标志
        boolean cleanSession = connectVariableHeader.isCleanSession();
        //保持连接,以秒为单位
        int keepAlive = connectVariableHeader.getKeepAlive();
        //载荷
        ConnectPayload payload = (ConnectPayload) packet.getPayload();
        //客户端id
        String clientId = payload.getClientId();
        log.debug("收到客户端[{}]的CONNECT", clientId);

        //遗嘱信息
        WillStatus willStatus = new WillStatus();
        willStatus.setClientId(clientId);
        willStatus.setWillFlag(connectVariableHeader.isWillFlag());
        willStatus.setWillQoS(connectVariableHeader.getWillQoS());
        willStatus.setWillRetain(connectVariableHeader.isWillRetain());
        willStatus.setWillTopic(payload.getWillTopic());
        willStatus.setWillMessage(payload.getWillMessage());

        //更新会话上下文
        sessionContext.setUserName(payload.getUserName());
        sessionContext.setClientId(clientId);
        sessionContext.setCleanSession(cleanSession);
        sessionContext.setKeepAlive(keepAlive);
        sessionContext.setLastKeepAliveTime(new Date());
        /**
         * 如果清理会话（CleanSession）标志被设置为0，服务端必须基于当前会话（使用客户端标识符识别）的状态恢复与客户端的通信。
         * 如果没有与这个客户端标识符关联的会话，服务端必须创建一个新的会话。在连接断开之后，当连接断开后，客户端和服务端必须保存会话信息 [MQTT-3.1.2-4]。
         * 当清理会话标志为0的会话连接断开之后，服务端必须将之后的QoS 1和QoS 2级别的消息保存为会话状态的一部分，如果这些消息匹配断开连接时客户端的任何订阅 [MQTT-3.1.2-5]。
         * 服务端也可以保存满足相同条件的QoS 0级别的消息。
         *
         * 如果清理会话（CleanSession）标志被设置为1，客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。
         * 会话仅持续和网络连接同样长的时间。
         * 与这个会话关联的状态数据不能被任何之后的会话重用 [MQTT-3.1.2-6]。
         */
        if (!cleanSession) {
            //更新会话状态的遗嘱信息
            repository.saveWillStatus(clientId, willStatus);
        } else {
            //删除会话状态
            repository.deleteSessionStatus(clientId);
            //更新会话状态的遗嘱信息
            repository.saveWillStatus(clientId, willStatus);
        }

        //查询出站未确认的PUBLISH报文和PUBREL报文
        List<Packet> sendPackets = RepositoryFactory.getRepository().getSendPackets(clientId);

        //重发未确认的PUBLISH报文和PUBREL报文
        reSend(sendPackets);
        //发送CONNACK报文
        sendConnack(0);
    }

    /**
     * 重发未确认的PUBLISH报文和PUBREL报文
     * 重发不做主题权限验证
     *
     * @param sendPackets
     */
    private void reSend(List<Packet> sendPackets) {
        if (sendPackets != null) {
            for (Packet packet : sendPackets) {
                if (packet != null) {
                    int packetType = packet.getFixedHeader().getPacketType();
                    if (PacketTypeEnum.PUBLISH.getCode() == packetType) {
                        //创建报文发送器
                        PacketSender packetSender = new PacketSender(sessionContext, new PublishPacketEncoder());
                        //更新重发标志
                        packet.getFixedHeader().setDup(true);
                        packetSender.send(packet);

                    } else if (PacketTypeEnum.PUBREL.getCode() == packetType) {
                        //创建报文发送器
                        PacketSender packetSender = new PacketSender(sessionContext, new PubrelPacketEncoder());
                        packetSender.send(packet);

                    }
                }
            }
        }
    }

    /**
     * 发送CONNACK报文
     *
     * @param returnCode
     */
    private Packet sendConnack(int returnCode) {
        //CONNACK固定报头
        FixedHeader fixedHeader = new FixedHeader();
        fixedHeader.setPacketType(PacketTypeEnum.CONNACK.getCode());
        fixedHeader.setFlags(0);
        fixedHeader.setRemainingLength(2);
        //CONNACK可变报头
        ConnackVariableHeader connackVariableHeader = new ConnackVariableHeader();
        connackVariableHeader.setSessionPresent(1);
        connackVariableHeader.setConnectReturnCode(returnCode);
        //CONNACK报文
        Packet connack = new Packet();
        connack.setFixedHeader(fixedHeader);
        connack.setVariableHeader(connackVariableHeader);
        //创建报文发送器
        PacketSender packetSender = new PacketSender(sessionContext, new ConnackPacketEncoder());
        //发送CONNACK报文
        packetSender.send(connack);
        return connack;
    }
}
